package hotnet.filter;

import hotnet.future.WriteFuture;
import hotnet.session.IoSession;

import java.util.EnumSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorFilter extends IoFilterAdapter {
	private EnumSet<IoEventType> eventTypes;
	private Executor executor;
	private boolean manageableExecutor;
	
	private static final int DEFAULT_MAX_POOL_SIZE = 16;
	private static final int BASE_THREAD_NUMBER = 0;
	private static final long DEFAULT_KEEPALIVE_TIME = 30;	// 30s
	
	private static final IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
        IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
        IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
	
	public ExecutorFilter(String name) {
		super(name);
		executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS);
		manageableExecutor = true;
		
		init(executor, manageableExecutor, eventTypes);
	}

	private void init(Executor executor, boolean manageableExecutor, EnumSet<IoEventType> eventTypes) {
		if (executor == null) {
			throw new IllegalArgumentException("executor null");
		}
		
		this.manageableExecutor = manageableExecutor;
		initEventTypes(eventTypes);
	}

	private void initEventTypes(EnumSet<IoEventType> eventTypes) {
		if (eventTypes == null || eventTypes.size() == 0) {
			eventTypes = EnumSet.of(DEFAULT_EVENT_SET[0], DEFAULT_EVENT_SET);
		} 
		
		this.eventTypes = eventTypes;
	}
	
    private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        // Create a new Executor
        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit);

        return executor;
    }
   
    @Override
    public void destory() {
    	if (manageableExecutor) {
    		((ExecutorService)this.executor).shutdown();
    	}
    }
    
    private void fireEvent(IoFilterEvent event) {
		executor.execute(event);
	}
    
    @Override
    public void sessionClosed(NextFilter nextFilter, IoSession ioSession) {
    	if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
    		IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,	ioSession, null);
    		
    		fireEvent(event);
    	} else {
    		nextFilter.sessionClosed(ioSession);
    	}
    }
    
    @Override
    public void sessionOpened(NextFilter nextFilter, IoSession ioSession) {
    	if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
    		IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,	ioSession, null);
    		
    		fireEvent(event);
    	} else {
    		nextFilter.sessionClosed(ioSession);
    	}
    }
    
    @Override
    public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
        if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
            fireEvent(event);
        } else {
            nextFilter.exceptionCaught(session, cause);
        }
    }
	
    @Override
    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
        if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
            fireEvent(event);
        } else {
            nextFilter.messageReceived(session, message);
        }
    }
    
    @Override
    public void messageSent(NextFilter nextFilter, IoSession session, WriteFuture writeFuture) {
        if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeFuture);
            fireEvent(event);
        } else {
            nextFilter.messageSent(session, writeFuture);
        }
    }
    
    @Override
    public final void filterWrite(NextFilter nextFilter, IoSession session, WriteFuture writeFuture) {
        if (eventTypes.contains(IoEventType.WRITE)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeFuture);
            fireEvent(event);
        } else {
            nextFilter.filterWrite(session, writeFuture);
        }
    }
    
    @Override
    public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
        if (eventTypes.contains(IoEventType.CLOSE)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
            fireEvent(event);
        } else {
            nextFilter.filterClose(session);
        }
    }
}
